package cn.ticsmyc.tools.multiThread.producerConsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * @author Ticsmyc
 * @date 2021-06-02 16:59
 */
public class WorkStealingChannel<P> implements WorkStealingEnabledChannel<P> {

    private final BlockingQueue<P>[] managedQueues;

    public WorkStealingChannel(BlockingQueue<P>[] managedQueues) {
        this.managedQueues = managedQueues;
    }

    public WorkStealingChannel(int queueNum) {
        this.managedQueues = new BlockingQueue[queueNum];
        for (int i = 0; i < this.managedQueues.length; i++) {
            managedQueues[i] = new LinkedBlockingDeque<P>();
        }
    }

    @Override
    public void put(P product) throws InterruptedException {
        //根据hashcode选择一个queue
        int index = product.hashCode() % managedQueues.length;
        BlockingQueue<P> targetQueue = managedQueues[index];
        //放入所选的queue
        targetQueue.put(product);
    }

    /**
     * 优先从指定的queue取
     * 如果指定了无效的queue 或者queue已空，则从其第一个非空的queue取
     * 如果queue都空，返回null
     *
     * @param queueId
     * @return
     * @throws InterruptedException
     */
    @Override
    public P take(int queueId) throws InterruptedException {
        BlockingQueue<P> targetQueue = null;
        P product = null;
        if(queueId >= 0 && queueId < this.managedQueues.length) {
            targetQueue = this.managedQueues[queueId];
        }
        if(null != targetQueue && !targetQueue.isEmpty()) {
            product = targetQueue.take();
        }

        int index = 0;
        while (null == product && index < this.managedQueues.length) {
            if(this.managedQueues[index] != null) {
                product = this.managedQueues[index].take();
            }
        }
        return product;
    }

    @Override
    public P take() throws InterruptedException {
        int randomIndex = (int) System.currentTimeMillis() % this.managedQueues.length;
        return take(randomIndex);
    }
}
